Test with buffer
authorJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 28 Sep 2018 08:48:49 +0000 (10:48 +0200)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Fri, 28 Sep 2018 08:48:49 +0000 (10:48 +0200)
src/siri/db/buffer.c

index 5aac01dfcbbe12a2328af7c1c22f0d6e9de701bc..b6a0ab80e4dfde1bd24e4770776f4d6a42546737 100644 (file)
 /* when set to 1, no caching is done. 1 is the minimum value. */
 #define SIRIDB_BUFFER_CACHE 64
 
-static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series);
-static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series);
+static int buffer__create_new(siridb_t * siridb, siridb_series_t * series);
+static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series);
+static int buffer__write_start(siridb_t * siridb, siridb_series_t * series);
+static void buffer__migrate_to_new(char * pt);
 
-static const uint64_t BUFFER_end = 0xffffffffffffffff;
+/* buffer__start cannot conflict with a series_id since id 0 is never used */
+static const uint32_t buffer__start = 0x00000000;
+static const uint64_t buffer__end = 0xffffffffffffffff;
 
 
 /*
@@ -46,7 +50,7 @@ int siridb_buffer_write_empty(
                 SEEK_SET) ||
 
         /* write end ts */
-        fwrite( &BUFFER_end,
+        fwrite( &buffer__end,
                 sizeof(uint64_t),
                 1,
                 siridb->buffer_fp) != 1) ? EOF : 0;
@@ -72,10 +76,7 @@ int siridb_buffer_write_last_point(
 
     memcpy(buf, &point->ts, sizeof(uint64_t));
     memcpy(buf + sizeof(uint64_t), &point->val, sizeof(qp_via_t));
-    memcpy(
-        buf + sizeof(uint64_t) + sizeof(qp_via_t),
-        &BUFFER_end,
-        sizeof(uint64_t));
+    memcpy(buf + 16, &buffer__end, sizeof(uint64_t));
 
     return (
         /* jump to position where to write the new point */
@@ -100,8 +101,8 @@ int siridb_buffer_new_series(siridb_t * siridb, siridb_series_t * series)
     }
 
     return (siridb->empty_buffers->len) ?
-            BUFFER_use_empty(siridb, series) :
-            BUFFER_create_new(siridb, series);
+            buffer__use_empty(siridb, series) :
+            buffer__create_new(siridb, series);
 }
 
 int siridb_buffer_fsync(siridb_t * siridb)
@@ -126,6 +127,24 @@ int siridb_buffer_open(siridb_t * siridb)
     return 0;
 }
 
+static void buffer__migrate_to_new(char * pt)
+{
+    char * npt = pt;
+    uint32_t series_id = *((uint32_t *) pt);
+    pt += sizeof(uint32_t);
+    size_t num = *((size_t *) pt);
+    pt += sizeof(size_t);
+
+    memcpy(npt, &buffer__start, sizeof(uint32_t));
+    npt += sizeof(uint32_t);
+    memcpy(npt, &series_id, sizeof(uint32_t));
+    npt += sizeof(uint32_t);
+
+    memmove(npt, pt, num * 16);
+    npt += num * 16;
+    memcpy(npt, &buffer__end, sizeof(uint64_t));
+}
+
 /*
  * Returns 0 if successful or -1 in case of an error.
  * (signal might be raised)
@@ -135,11 +154,14 @@ int siridb_buffer_load(siridb_t * siridb)
     FILE * fp;
     FILE * fp_temp;
     size_t read_at_once = 8;
-    size_t num, i, j;
+    size_t num, i;
     char buffer[siridb->buffer_size * read_at_once];
     char * pt;
     long int offset = 0;
     siridb_series_t * series;
+    _Bool log_migrate = 1;
+    uint32_t buf_start, series_id;
+    uint64_t * ts;
 
     log_info("Loading and cleanup buffer");
 
@@ -182,11 +204,24 @@ int siridb_buffer_load(siridb_t * siridb)
     {
         for (i = 0; i < num; i++)
         {
-
             pt = buffer + i * siridb->buffer_size;
 
-            series = (siridb_series_t *)
-                    imap_get(siridb->series_map, *((uint32_t *) pt));
+            buf_start = *((uint32_t *) pt);
+            if (buf_start != buffer__start)
+            {
+                if (log_migrate)
+                {
+                    log_warning("Buffer will be migrated");
+                    log_migrate = 0;
+                }
+                buffer__migrate_to_new(pt);
+            }
+
+            pt += sizeof(uint32_t);
+            series_id = *((uint32_t *) pt);
+            pt += sizeof(uint32_t);
+
+            series = imap_get(siridb->series_map, series_id);
 
             if (series == NULL)
             {
@@ -206,16 +241,10 @@ int siridb_buffer_load(siridb_t * siridb)
 
             series->bf_offset = offset;
 
-            pt += sizeof(uint32_t);
-
-            for (   j = (size_t) *pt, pt += sizeof(size_t);
-                    j--;
-                    pt += 16)
+            for (; *(ts = (uint64_t *) pt) != buffer__end; pt += 16)
             {
-                siridb_points_add_point(
-                        series->buffer,
-                        (uint64_t *) pt,
-                        (qp_via_t *) (pt + 8));
+                qp_via_t * val = (qp_via_t *) (pt + 8);
+                siridb_points_add_point(series->buffer, ts, val);
             }
 
             offset += siridb->buffer_size;
@@ -248,6 +277,19 @@ int siridb_buffer_load(siridb_t * siridb)
     return 0;
 }
 
+static int buffer__write_start(siridb_t * siridb, siridb_series_t * series)
+{
+    const size_t sz = sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t);
+    char buf[sz];
+
+    memcpy(buf, &buffer__start, sizeof(uint32_t));
+    memcpy(buf + sizeof(uint32_t), &series->id, sizeof(uint32_t));
+    memcpy(buf + sizeof(uint64_t), &buffer__end, sizeof(uint64_t));
+
+    /* write series ID and 0 length to buffer */
+    return (fwrite(buf, sz, 1, siridb->buffer_fp) == 1) ? 0 : -1;
+}
+
 /*
  * Reserve a space in the buffer for a new series. The position of this space
  * in the buffer is read from siridb->empty_buffers so this list must have
@@ -258,11 +300,8 @@ int siridb_buffer_load(siridb_t * siridb)
  * Note that an available spot must be checked before calling this function.
  * This functions has undefined behavior if no spot is found.
  */
-static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series)
+static int buffer__use_empty(siridb_t * siridb, siridb_series_t * series)
 {
-    const size_t sz = sizeof(uint32_t) + sizeof(size_t);
-    char buf[sz];
-
     series->bf_offset = (long int) slist_pop(siridb->empty_buffers);
 
     /* jump to the correct buffer position */
@@ -272,11 +311,8 @@ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series)
         return -1;
     }
 
-    memcpy(buf, &series->id, sizeof(uint32_t));
-    memcpy(buf + sizeof(uint32_t), &series->buffer->len, sizeof(size_t));
-
     /* write series ID and 0 length to buffer */
-    if (fwrite(buf, sz, 1, siridb->buffer_fp) != 1)
+    if (buffer__write_start(siridb, series))
     {
         ERR_FILE
         return -1;
@@ -292,7 +328,7 @@ static int BUFFER_use_empty(siridb_t * siridb, siridb_series_t * series)
  *
  * Returns 0 if successful or -1 and a signal is raised in case of an error.
  */
-static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series)
+static int buffer__create_new(siridb_t * siridb, siridb_series_t * series)
 {
     long int buffer_pos;
     /* get file descriptor */
@@ -318,8 +354,8 @@ static int BUFFER_create_new(siridb_t * siridb, siridb_series_t * series)
         return -1;
     }
 
-    /* write series ID to buffer */
-    if (fwrite(&series->id, sizeof(uint32_t), 1, siridb->buffer_fp) != 1)
+    /* write buffer start and series ID to buffer */
+    if (buffer__write_start(siridb, series))
     {
         ERR_FILE
         return -1;